1. 背景
在实际开发过程中遇到了客户购买物联网平台能力的需求,因为客户购买操作在增值服务平台,能力开通在我们团队的应用中,常规的方案是在购买成功后发送 Kafka 消息,由我们团队订阅消息,完成后续的操作。
由于涉及到用户付款操作,所以需要保证消息不丢失。
2. 哪些步骤可能会丢消息
2.1 生产者丢消息
Kafka Producer 其实是异步发送消息的,如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。
该方法只是将消息写入 Buffer Pool 中,会由后台线程批量将消息发送给 Broker,如果消息还未发送,此时宕机,Buffer Pool 中的数据会丢失;如果在后台线程发送给 Broker 的过程中失败了,业务中同样无法精确的感知到。
推荐使用带回调函数的 send 方法,一旦出现消息提交失败的情况,我们可以有针对性地进行处理。
部分示例代码:
1 | kafkaProducer.send(new ProducerRecord(TOPIC, i + "", i + ""), new Callback() { |
如果回调函数中异常不为空,需要根据异常的类型进行相应的处理,如果是消息体有问题(格式不对或者大小越界),证明 Producer 的代码有问题,消息需要重新定义;如果是 Broker 网络问题,可以进行重试。
针对 exception 的处理可以通过日志或者 trace 进行记录,方便监控和告警。
2.2 Broker丢消息
对于 Broker,一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。
对这句话做以下解释:
- 已提交消息:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
- 若干个 Broker:这个取决于 Producer 和 Broker 的相关配置,其中,Producer 可以通过
acks
的配置指定了是否 Broker 是否写日志文件以及同步数据到 Follower;Broker 可以通过replication.factor
的配置指定副本个数,通过min.insync.replicas
的配置指定需要同步多少副本才算是已提交消息。
当然,如果出现 Kafka 集群中的所有 Broker 同时全部宕机这种极端情况,消息还是有丢失的风险。所以,Kafka 只做有限度的持久化保证。
2.3 消费者丢消息
消费者丢消息主要是自动 ack 。
3. 生产环境无消息丢失配置
配置涉及到 Producer、Broker 以及 Consumer。
3.1 Producer
- 不要使用 producer.send(msg),推荐使用 producer.send(msg, callback)
- 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”
- 设置 retries 为一个较大的值
3.2 Broker 端
在配置之前,需要和负责 Kafka 的中间件同学沟通,拿到目前 Broker 的相关参数配置,如果不符合你的需求,看是否能做相应的调整。
- 设置 unclean.leader.election.enable = false,不允许非 ISR 中的副本被选举为 Leader,如果一个 Broker 数据落后 Leader 太多,那么一旦它成为新的 Leader,必然会造成数据丢失
- 设置 replication.factor >= 3,该参数决定副本数量,最好将消息多保存几份,增加冗余
- 设置 min.insync.replicas > 1,该参数定义消息至少要被写入到多少个副本才算是“已提交”,推荐 replication.factor = min.insync.replicas + 1
我咨询了我司中间件同学,replication.factor 设置值为 3,min.insync.replicas 设置值为 2,unclean.leader.election.enable 设置值为 false,因此是满足无消息丢失的配置的。
3.3. Consumer
- 设置 enable.auto.commit=false,手动 ack
3.4 小提醒
以我这些年和中间件打交道的经验来看,除非是 Kafka 服务端的配置十分不合理,否则要想说服中间件的同学改 Broker 的配置是十分困难的。
因此,如果没办法保证整个 Kafka 的配置都满足无消息丢失的完美配置,需要针对某些可能丢消息的场景做兜底方案,比如系统对账甚至人工介入等。
Kafka 客户端详细的配置可参见之前的文章——Kafka调优与详细参数说明 。
4. 参考
《Apache Kafka 实战》